# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

require 'logstash/namespace'
require 'logstash/outputs/base'
require 'jruby-kafka'

# Write events to a Kafka topic. This uses the Kafka Producer API to write messages to a topic on
# the broker.
#
# The only required configuration is the topic name. The default codec is json,
# so events will be persisted on the broker in json format. If you select a codec of plain,
# Logstash will encode your messages with not only the message but also with a timestamp and
# hostname. If you do not want anything but your message passing through, you should make the output
# configuration something like:
# [source,ruby]
#     output {
#       kafka {
#         codec => plain {
#            format => "%{message}"
#         }
#       }
#     }
# For more information see http://kafka.apache.org/documentation.html#theproducer
#
# Kafka producer configuration: http://kafka.apache.org/documentation.html#producerconfigs
class LogStash::Outputs::Kafka < LogStash::Outputs::Base
  config_name 'kafka'
  milestone 2

  default :codec, 'json'
  # This is for bootstrapping and the producer will only use it for getting metadata (topics,
  # partitions and replicas). The socket connections for sending the actual data will be
  # established based on the broker information returned in the metadata. The format is
  # `host1:port1,host2:port2`, and the list can be a subset of brokers or a VIP pointing to a
  # subset of brokers.
  config :broker_list, :validate => :string, :default => 'localhost:9092'
  # The topic to produce the messages to
  config :topic_id, :validate => :string, :required => true
  # This parameter allows you to specify the compression codec for all data generated by this
  # producer. Valid values are `none`, `gzip` and `snappy`.
  config :compression_codec, :validate => %w( none gzip snappy ), :default => 'none'
  # This parameter allows you to set whether compression should be turned on for particular
  # topics. If the compression codec is anything other than `NoCompressionCodec`,
  # enable compression only for specified topics if any. If the list of compressed topics is
  # empty, then enable the specified compression codec for all topics. If the compression codec
  # is `NoCompressionCodec`, compression is disabled for all topics
  config :compressed_topics, :validate => :string, :default => ''
  # This value controls when a produce request is considered completed. Specifically,
  # how many other brokers must have committed the data to their log and acknowledged this to the
  # leader. For more info, see -- http://kafka.apache.org/documentation.html#producerconfigs
  config :request_required_acks, :validate => [-1,0,1], :default => 0
  # The serializer class for messages. The default encoder takes a byte[] and returns the same byte[]
  config :serializer_class, :validate => :string, :default => 'kafka.serializer.StringEncoder'
  # The partitioner class for partitioning messages amongst partitions in the topic. The default
  # partitioner is based on the hash of the key. If the key is null,
  # the message is sent to a random partition in the broker.
  # NOTE: `topic_metadata_refresh_interval_ms` controls how long the producer will distribute to a
  # partition in the topic. This defaults to 10 mins, so the producer will continue to write to a
  # single partition for 10 mins before it switches
  config :partitioner_class, :validate => :string, :default => 'kafka.producer.DefaultPartitioner'
  # The amount of time the broker will wait trying to meet the `request.required.acks` requirement
  # before sending back an error to the client.
  config :request_timeout_ms, :validate => :number, :default => 10000
  # This parameter specifies whether the messages are sent asynchronously in a background thread.
  # Valid values are (1) async for asynchronous send and (2) sync for synchronous send. By
  # setting the producer to async we allow batching together of requests (which is great for
  # throughput) but open the possibility of a failure of the client machine dropping unsent data.
  config :producer_type, :validate => %w( sync async ), :default => 'sync'
  # The serializer class for keys (defaults to the same as for messages if nothing is given)
  config :key_serializer_class, :validate => :string, :default => 'kafka.serializer.StringEncoder'
  # This property will cause the producer to automatically retry a failed send request. This
  # property specifies the number of retries when such failures occur. Note that setting a
  # non-zero value here can lead to duplicates in the case of network errors that cause a message
  # to be sent but the acknowledgement to be lost.
  config :message_send_max_retries, :validate => :number, :default => 3
  # Before each retry, the producer refreshes the metadata of relevant topics to see if a new
  # leader has been elected. Since leader election takes a bit of time,
  # this property specifies the amount of time that the producer waits before refreshing the
  # metadata.
  config :retry_backoff_ms, :validate => :number, :default => 100
  # The producer generally refreshes the topic metadata from brokers when there is a failure
  # (partition missing, leader not available...). It will also poll regularly (default: every
  # 10min so 600000ms). If you set this to a negative value, metadata will only get refreshed on
  # failure. If you set this to zero, the metadata will get refreshed after each message sent
  # (not recommended). Important note: the refresh happen only AFTER the message is sent,
  # so if the producer never sends a message the metadata is never refreshed
  config :topic_metadata_refresh_interval_ms, :validate => :number, :default => 600 * 1000
  # Maximum time to buffer data when using async mode. For example a setting of 100 will try to
  # batch together 100ms of messages to send at once. This will improve throughput but adds
  # message delivery latency due to the buffering.
  config :queue_buffering_max_ms, :validate => :number, :default => 5000
  # The maximum number of unsent messages that can be queued up the producer when using async
  # mode before either the producer must be blocked or data must be dropped.
  config :queue_buffering_max_messages, :validate => :number, :default => 10000
  # The amount of time to block before dropping messages when running in async mode and the
  # buffer has reached `queue.buffering.max.messages`. If set to 0 events will be enqueued
  # immediately or dropped if the queue is full (the producer send call will never block). If set
  # to -1 the producer will block indefinitely and never willingly drop a send.
  config :queue_enqueue_timeout_ms, :validate => :number, :default => -1
  # The number of messages to send in one batch when using async mode. The producer will wait
  # until either this number of messages are ready to send or `queue.buffer.max.ms` is reached.
  config :batch_num_messages, :validate => :number, :default => 200
  # Socket write buffer size
  config :send_buffer_bytes, :validate => :number, :default => 100 * 1024
  # The client id is a user-specified string sent in each request to help trace calls. It should
  # logically identify the application making the request.
  config :client_id, :validate => :string, :default => ''
  # Provides a way to specify a partition key as a string. To specify a partition key for
  # Kafka, configure a format that will produce the key as a string. Defaults
  # `key_serializer_class` to `kafka.serializer.StringEncoder` to match. For example, to partition
  # by host:
  # [source,ruby]
  #     output {
  #       kafka {
  #           partition_key_format => "%{host}"
  #       }
  #     }
  config :partition_key_format, :validate => :string, :default => nil

  public
  def register
    LogStash::Logger.setup_log4j(@logger)

    options = {
        :broker_list => @broker_list,
        :compression_codec => @compression_codec,
        :compressed_topics => @compressed_topics,
        :request_required_acks => @request_required_acks,
        :serializer_class => @serializer_class,
        :partitioner_class => @partitioner_class,
        :request_timeout_ms => @request_timeout_ms,
        :producer_type => @producer_type,
        :key_serializer_class => @key_serializer_class,
        :message_send_max_retries => @message_send_max_retries,
        :retry_backoff_ms => @retry_backoff_ms,
        :topic_metadata_refresh_interval_ms => @topic_metadata_refresh_interval_ms,
        :queue_buffering_max_ms => @queue_buffering_max_ms,
        :queue_buffering_max_messages => @queue_buffering_max_messages,
        :queue_enqueue_timeout_ms => @queue_enqueue_timeout_ms,
        :batch_num_messages => @batch_num_messages,
        :send_buffer_bytes => @send_buffer_bytes,
        :client_id => @client_id
    }
    @producer = Kafka::Producer.new(options)
    @producer.connect

    @logger.info('Registering kafka producer', :topic_id => @topic_id, :broker_list => @broker_list)

    @codec.on_event do |data|
      begin
        @producer.send_msg(@current_topic_id,@partition_key,data)
      rescue LogStash::ShutdownSignal
        @logger.info('Kafka producer got shutdown signal')
      rescue => e
        @logger.warn('kafka producer threw exception, restarting',
                     :exception => e)
      end
    end
  end # def register

  def receive(event)
    return unless output?(event)
    if event == LogStash::SHUTDOWN
      finished
      return
    end
    @partition_key = if @partition_key_format.nil? then nil else event.sprintf(@partition_key_format) end
    @current_topic_id = if @topic_id.nil? then nil else event.sprintf(@topic_id) end
    @codec.encode(event)
    @partition_key = nil
    @current_topic_id = nil
  end

  def teardown
    @producer.close
  end
end #class LogStash::Outputs::Kafka

